
package com.shiku.imserver.repository.runnable;


import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.shiku.imserver.common.message.ChatMessage;
import com.shiku.imserver.repository.LastChatModel;
import com.shiku.imserver.service.IMBeanUtils;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class LastMessageDBRunnable
        extends BaseRepositoryMapRunnable<ChatMessage> {
    private static final String LASTCHAT_MUC_COLLECTION = "lastChats_muc";
    private MongoDatabase lastMsgDB;
    protected ConcurrentHashMap<String, MongoCollection<Document>> dbMaps = new ConcurrentHashMap<>();

    protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private MongoCollection<Document> lastMucCollection;
    private static Logger log = LoggerFactory.getLogger(LastMessageDBRunnable.class);


    public LastMessageDBRunnable() {

        MongoClient mongoClient = IMBeanUtils.getBeanManager().getMongoClient();

        this.lastMsgDB = mongoClient.getDatabase("shiku_lastChats");

        this.lastMucCollection = this.lastMsgDB.getCollection("lastChats_muc");

    }


    public MongoCollection<Document> getMongoCollection(String userId) throws Exception {

        String collectionName = getCollectionName(userId);

        MongoCollection<Document> collection = this.dbMaps.get(collectionName);

        if (null == collection) {
            collection = this.lastMsgDB.getCollection(collectionName);
        }

        collectionName = null;

        return collection;

    }


    public void putLastChat(ChatMessage message) {

        if (2 == message.getMessageHead().getChatType()) {

            addMsg(message.getMessageHead().getTo(), message);

        } else if (1 == message.getMessageHead().getChatType()) {

            addMsg(message.getFromUserId() + "_" + message.getToUserId(), message);

        }

    }


    @Override
    public void runTask() {

        try {

            Iterator<Map.Entry<String, ChatMessage>> iterator = null;

            Map.Entry<String, ChatMessage> entry = null;

            iterator = this.maps.entrySet().iterator();

            while (iterator.hasNext()) {

                entry = iterator.next();

                refreshLastChat(entry.getValue());

                try {

                    this.lock.writeLock().lock();

                    iterator.remove();

                } catch (Exception e) {

                    log.error(e.toString(), e);

                } finally {

                    this.lock.writeLock().unlock();

                }


            }

        } catch (Exception e) {

            log.error(e.toString(), e);

        } finally {
        }

    }


    public void refreshLastChat(ChatMessage message) {

        LastChatModel lastChat = null;

        if (2 == message.getMessageHead().getChatType()) {

            lastChat = new LastChatModel();

            lastChat.setMessageId(message.getMessageHead().getMessageId());

            lastChat.setContent(message.getContent());

            lastChat.setUserId(message.getFromUserId());

            lastChat.setJid(message.getMessageHead().getTo());

            lastChat.setIsRoom(1);

            lastChat.setType(message.getType());

            lastChat.setTimeSend(message.getTimeSend());

            lastChat.setEncrypt(message.isEncrypt());

            lastChat.setFrom(message.getMessageHead().getFrom());

            lastChat.setTo(message.getMessageHead().getTo());

            lastChat.setFromUserName(message.getFromUserName());

            lastChat.setToUserName(message.getToUserName());

        } else if (1 == message.getMessageHead().getChatType()) {

            lastChat = new LastChatModel();

            lastChat.setMessageId(message.getMessageHead().getMessageId());

            lastChat.setContent(message.getContent());

            lastChat.setUserId(message.getFromUserId());

            lastChat.setJid(message.getToUserId());

            lastChat.setIsRoom(0);

            lastChat.setType(message.getType());

            lastChat.setTimeSend(message.getTimeSend());

            lastChat.setEncrypt(message.isEncrypt());

            lastChat.setFrom(message.getMessageHead().getFrom());

            lastChat.setTo(message.getMessageHead().getTo());

            lastChat.setFromUserName(message.getFromUserName());

            lastChat.setToUserName(message.getToUserName());

        } else {

            return;

        }

        MongoCollection<Document> collection = null;


        try {

            if (0 == lastChat.getIsRoom()) {

                collection = getMongoCollection(lastChat.getUserId());

            } else {

                collection = this.lastMucCollection;

            }

        } catch (NumberFormatException e) {

            log.error("error {} ===> {} ", e.getMessage(), message.toString());

            return;

        } catch (Exception e) {

            log.error(e.getMessage(), e);


            return;

        }

        Document query = new Document("jid", lastChat.getJid());


        Document values = new Document("type", Integer.valueOf(lastChat.getType()));

        values.append("messageId", lastChat.getMessageId());

        values.append("timeSend", Long.valueOf(lastChat.getTimeSend()));

        values.append("content", lastChat.getContent());


        values.append("userId", lastChat.getUserId());

        values.append("jid", lastChat.getJid());

        values.append("isRoom", Integer.valueOf(lastChat.getIsRoom()));

        values.append("isEncrypt", Boolean.valueOf(lastChat.isEncrypt()));

        values.append("from", lastChat.getFrom());

        values.append("to", lastChat.getTo());

        values.append("fromUserName", lastChat.getFromUserName());

        values.append("toUserName", lastChat.getToUserName());

        if (1 != lastChat.getIsRoom()) {

            query.append("userId", lastChat.getUserId());

        }

        if (0L < collection.count((Bson) query)) {

            collection.updateMany((Bson) query, (Bson) new Document("$set", values));

        } else {

            collection.insertOne(values);

        }

        if (1 == lastChat.getIsRoom() || lastChat.getUserId().equals(lastChat.getJid())) {
            return;
        }

        if (1 != lastChat.getIsRoom()) {

            query.replace("userId", lastChat.getJid());

            query.replace("jid", lastChat.getUserId());


            values.replace("userId", lastChat.getJid());

            values.replace("jid", lastChat.getUserId());

        }

        values.remove("_id");

        try {

            collection = getMongoCollection(lastChat.getJid());

        } catch (NumberFormatException e) {

            log.error("error {} ===> {} ", e.getMessage(), message.toString());

            return;

        } catch (Exception e) {

            log.error(e.getMessage(), e);


            return;

        }

        if (0L < collection.count((Bson) query)) {

            collection.updateMany((Bson) query, (Bson) new Document("$set", values));

        } else {

            collection.insertOne(values);

        }

    }

}


